package com.sixbro.data.redis.config;

import com.sixbro.data.redis.annotation.RedisMessageListener;
import com.sixbro.data.redis.service.MessageReceiver;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;
import org.springframework.util.ReflectionUtils;

import java.lang.reflect.Method;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;

/**
 * <p>
 *
 * </p>
 *
 * @author: Mr.Lu
 * @since: 2021-12-18 16:06
 */
@Component
public class RedisMessageListenerRegistry implements ApplicationRunner {
    /**
     * 频道channel
     */
    public static final String SUB_KEY = "chat";
    /**
     * AtomicLong 可以理解为加了 synchronized 的 long 类型
     */
    private AtomicLong counter = new AtomicLong(0);

    @Autowired
    private ApplicationContext context;

    /**
     * 创建 Redis 消息监听器容器
     * 可以添加多个监听不同话题的redis监听器，只需要把消息监听器和相应的消息订阅处理器绑定，该消息监听器
     * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
     * @param connectionFactory
     * @param listenerAdapter
     * @return
     */
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter onMsg, MessageListenerAdapter onMessage, MessageListenerAdapter listenerAdapter) {
        // 创建一个消息监听对象
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        // 将回调的方法注册到 container 中去
        container.addMessageListener(onMsg, new PatternTopic("onMsg"));
        container.addMessageListener(onMessage, new PatternTopic("onMessage"));
        //订阅了一个频道
        container.addMessageListener(listenerAdapter, new PatternTopic(SUB_KEY));
        return container;
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        // 获取Redis的消息监听容器
        RedisMessageListenerContainer container = context.getBean(RedisMessageListenerContainer.class);

        // 扫描注册所有的 @RedisMessageListener 的方法，添加到容器中
        for (String beanName : context.getBeanNamesForType(Object.class)) {
            ReflectionUtils.doWithMethods(Objects.requireNonNull(context.getType(beanName)),
                    method -> {
                        ReflectionUtils.makeAccessible(method);
                        Object target = context.getBean(beanName);
                        RedisMessageListener annotation = AnnotationUtils.findAnnotation(method, RedisMessageListener.class);
                        MessageListenerAdapter adapter = registerBean((GenericApplicationContext) context, target, method);
                        container.addMessageListener(adapter, new PatternTopic(annotation.topic()));
                    },
                    method -> !method.isSynthetic() && method.getParameterTypes().length == 1
                            && AnnotationUtils.findAnnotation(method, RedisMessageListener.class) != null);
        }
    }

    private MessageListenerAdapter registerBean(GenericApplicationContext context, Object target, Method method) {
        String containerBeanName = String.format("%s_%s", MessageListenerAdapter.class.getName(), counter.incrementAndGet());
        context.registerBean(containerBeanName, MessageListenerAdapter.class, () -> new MessageListenerAdapter(target, method.getName()));
        return context.getBean(containerBeanName, MessageListenerAdapter.class);
    }

    /**
     * 绑定消息监听者和接收监听的方法
     *
     * @param receiver
     * @return
     */
    @Bean
    public MessageListenerAdapter onMsg(MessageReceiver receiver){
        // 这里的 onMsg 是在 ReceiverRedisMessage 中具体存在的方法，发布订阅之后，会回调这个方法
        return new MessageListenerAdapter(receiver,"onMsg");
    }

    /**
     * 绑定消息监听者和接收监听的方法
     *
     * @param receiver
     * @return
     */
    @Bean
    public MessageListenerAdapter onMessage(MessageReceiver receiver){
        // 这里的 onMessage 是在 ReceiverRedisMessage 中具体存在的方法，发布订阅之后，会回调这个方法
        return new MessageListenerAdapter(receiver,"onMessage");
    }

    /**
     * 注册消息监听器适配器，绑定消息处理器，利用反射技术调用消息处理器的业务方法
     * @param receiver 监听器实现类
     * @return
     */
    @Bean
    MessageListenerAdapter listenerAdapter(MessageReceiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }
}
